{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "84b22f68",
   "metadata": {},
   "source": [
    "# @produces basics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7a35b05c",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "import platform\n",
    "from typing import *\n",
    "\n",
    "from IPython.display import Markdown as md\n",
    "\n",
    "from fastkafka._components._subprocess import terminate_asyncio_process\n",
    "from fastkafka._testing.apache_kafka_broker import run_and_match\n",
    "from fastkafka.testing import ApacheKafkaBroker, run_script_and_cancel"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8c655e4f",
   "metadata": {},
   "source": [
    "You can use `@produces` decorator to produce messages to Kafka topics. \n",
    "\n",
    "In this guide we will create a simple FastKafka app that will produce hello world messages to hello_world topic."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "18535f2f",
   "metadata": {},
   "source": [
    "## Import `FastKafka`"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b4ef969d",
   "metadata": {},
   "source": [
    "To use the `@produces` decorator, frist we need to import the base FastKafka app to create our application."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3d6a8fae",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from fastkafka import FastKafka\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "import_fastkafka = \"\"\"from fastkafka import FastKafka\n",
    "\"\"\"\n",
    "\n",
    "md(f\"```python\\n{import_fastkafka}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "09cb37e9",
   "metadata": {},
   "source": [
    "## Define the structure of the messages\n",
    "Next, you need to define the structure of the messages you want to send to the topic using [pydantic](https://docs.pydantic.dev/). For the guide we'll stick to something basic, but you are free to define any complex message structure you wish in your project, just make sure it can be JSON encoded.\n",
    "\n",
    "Let's import `BaseModel` and `Field` from pydantic and create a simple `HelloWorld` class containing one string parameter `msg`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f83265a1",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "import_pydantic = \"\"\"from pydantic import BaseModel, Field\n",
    "\"\"\"\n",
    "md(f\"```python\\n{import_pydantic}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9c1e6ec8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "class HelloWorld(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "define_HelloWorld = \"\"\"\n",
    "class HelloWorld(BaseModel):\n",
    "    msg: str = Field(\n",
    "        ...,\n",
    "        example=\"Hello\",\n",
    "        description=\"Demo hello world message\",\n",
    "    )\n",
    "\"\"\"\n",
    "md(f\"```python\\n{define_HelloWorld}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fc1d810c",
   "metadata": {},
   "source": [
    "## Create a base FastKafka app\n",
    "\n",
    "Now we will create and define a base FastKafka app, replace the `<url_of_your_kafka_bootstrap_server>` and `<port_of_your_kafka_bootstrap_server>` with the actual values of your Kafka bootstrap server"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2732642f",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"demo_broker\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local demo kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    }\n",
       "}\n",
       "\n",
       "app = FastKafka(kafka_brokers=kafka_brokers)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "create_app = \"\"\"\n",
    "\n",
    "kafka_brokers = {\n",
    "    \"demo_broker\": {\n",
    "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
    "        \"description\": \"local demo kafka broker\",\n",
    "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
    "    }\n",
    "}\n",
    "\n",
    "app = FastKafka(kafka_brokers=kafka_brokers)\n",
    "\"\"\"\n",
    "md(f\"```python\\n{create_app}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6e41ebe6",
   "metadata": {},
   "source": [
    "## Create a producer function and decorate it with `@produces`\n",
    "\n",
    "Let's create a producer function that will produce `HelloWorld` messages to *hello_world* topic:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "93dd7a11",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msg: str) -> HelloWorld:\n",
       "    return HelloWorld(msg=msg)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "decorate_produces = \"\"\"\n",
    "@app.produces()\n",
    "async def to_hello_world(msg: str) -> HelloWorld:\n",
    "    return HelloWorld(msg=msg)\n",
    "\"\"\"\n",
    "md(f\"```python\\n{decorate_produces}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4216f0f1",
   "metadata": {},
   "source": [
    "Now you can call your defined function as any normal python function in your code. The side effect of calling the function will be that the value you are returning will also be sent to a kafka topic.\n",
    "\n",
    "By default, the topic is determined from your function name, the \"to_\" prefix is stripped and what is left over is used as a topic name. I  this case, that is *hello_world*."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f02f147e",
   "metadata": {},
   "source": [
    "## Instruct the app to start sending HelloWorld messages\n",
    "\n",
    "Let's use `@run_in_background` decorator to instruct our app to send HelloWorld messages to hello_world topic every second."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "386ce09a",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "import asyncio\n",
       "\n",
       "@app.run_in_background()\n",
       "async def hello_every_second():\n",
       "    while(True):\n",
       "        await to_hello_world(msg=\"Hello world!\")\n",
       "        await asyncio.sleep(1)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "define_run = \"\"\"\n",
    "import asyncio\n",
    "\n",
    "@app.run_in_background()\n",
    "async def hello_every_second():\n",
    "    while(True):\n",
    "        await to_hello_world(msg=\"Hello world!\")\n",
    "        await asyncio.sleep(1)\n",
    "\"\"\"\n",
    "md(f\"```python\\n{define_run}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b68c95f7",
   "metadata": {},
   "source": [
    "## Final app\n",
    "\n",
    "Your app code should look like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cd7d88d3",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "from fastkafka import FastKafka\n",
       "from pydantic import BaseModel, Field\n",
       "\n",
       "class HelloWorld(BaseModel):\n",
       "    msg: str = Field(\n",
       "        ...,\n",
       "        example=\"Hello\",\n",
       "        description=\"Demo hello world message\",\n",
       "    )\n",
       "\n",
       "\n",
       "kafka_brokers = {\n",
       "    \"demo_broker\": {\n",
       "        \"url\": \"<url_of_your_kafka_bootstrap_server>\",\n",
       "        \"description\": \"local demo kafka broker\",\n",
       "        \"port\": \"<port_of_your_kafka_bootstrap_server>\",\n",
       "    }\n",
       "}\n",
       "\n",
       "app = FastKafka(kafka_brokers=kafka_brokers)\n",
       "\n",
       "@app.produces()\n",
       "async def to_hello_world(msg: str) -> HelloWorld:\n",
       "    return HelloWorld(msg=msg)\n",
       "\n",
       "import asyncio\n",
       "\n",
       "@app.run_in_background()\n",
       "async def hello_every_second():\n",
       "    while(True):\n",
       "        await to_hello_world(msg=\"Hello world!\")\n",
       "        await asyncio.sleep(1)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "produces_example = (\n",
    "    import_fastkafka\n",
    "    + import_pydantic\n",
    "    + define_HelloWorld\n",
    "    + create_app\n",
    "    + decorate_produces\n",
    "    + define_run\n",
    ")\n",
    "md(f\"```python\\n{produces_example}\\n```\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "8fa74215",
   "metadata": {},
   "source": [
    "## Run the app"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "afe529ab",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "Now we can run the app. Copy the code above in producer_example.py and run it by running\n",
       "```shell\n",
       "fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_example:app\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "script_file = \"producer_example.py\"\n",
    "cmd = \"fastkafka run --num-workers=1 --kafka-broker=demo_broker producer_example:app\"\n",
    "md(\n",
    "    f\"Now we can run the app. Copy the code above in producer_example.py and run it by running\\n```shell\\n{cmd}\\n```\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e66be2b9",
   "metadata": {},
   "source": [
    "After running the command, you should see this output in your terminal:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "aad42638",
   "metadata": {},
   "outputs": [],
   "source": [
    "# | hide\n",
    "\n",
    "\n",
    "async def _run_example_app(\n",
    "    *, app_example: str, bootstrap_server: str, script_file: str, cmd: str\n",
    ") -> Tuple[int, str]:\n",
    "    server_url = bootstrap_server.split(\":\")[0]\n",
    "    server_port = bootstrap_server.split(\":\")[1]\n",
    "    exit_code, output = await run_script_and_cancel(\n",
    "        script=app_example.replace(\n",
    "            \"<url_of_your_kafka_bootstrap_server>\", server_url\n",
    "        ).replace(\"<port_of_your_kafka_bootstrap_server>\", server_port),\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "        cancel_after=5,\n",
    "    )\n",
    "    return exit_code, output.decode(\"UTF-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "335ec62d",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 84224...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 84224 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 83864...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 83864 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topicas=[\"hello_world\"], apply_nest_asyncio=True, listener_port=21092\n",
    ") as bootstrap_server:\n",
    "    exit_code, output = await _run_example_app(\n",
    "        app_example=produces_example,\n",
    "        bootstrap_server=bootstrap_server,\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "    )\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, exit_code"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cf40711b",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[84645]: [INFO] fastkafka._application.app: run_in_background() : Adding function 'hello_every_second' as background task\n",
      "[84645]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:9092'\n",
      "[84645]: [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n",
      "[84645]: [INFO] fastkafka._application.app: _populate_bg_tasks() : Starting background task 'hello_every_second'\n",
      "[84645]: [WARNING] aiokafka.cluster: Topic hello_world is not available during auto-create initialization\n",
      "[84645]: [WARNING] aiokafka.cluster: Topic hello_world is not available during auto-create initialization\n",
      "Starting process cleanup, this may take a few seconds...\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 84645...\n",
      "[84645]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Cancelling background task 'hello_every_second'\n",
      "[84645]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Waiting for background task 'hello_every_second' to finish\n",
      "[84645]: [INFO] fastkafka._application.app: _shutdown_bg_tasks() : Execution finished for background task 'hello_every_second'\n",
      "[INFO] fastkafka._server: terminate_asyncio_process(): Process 84645 terminated.\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "print(output)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d4ec6dab",
   "metadata": {},
   "source": [
    "## Check if the message was sent to the Kafka topic"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c81e65bc",
   "metadata": {},
   "source": [
    "Lets check the topic and see if there is a \"Hello world!\" message in the hello_world topic. In your terminal run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6ef181f6",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```shell\n",
       "kafka-console-consumer.sh -topic=hello_world --from-beginning -bootstrap-server=<addr_of_your_kafka_bootstrap_server>\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: false\n",
    "\n",
    "script_extension = \".bat\" if platform.system() == \"Windows\" else \".sh\"\n",
    "consumer_cmd = f\"kafka-console-consumer{script_extension} -topic=hello_world --from-beginning -bootstrap-server=<addr_of_your_kafka_bootstrap_server>\"\n",
    "md(f\"```shell\\n{consumer_cmd}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a66904c8",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "stdout=, stderr=, returncode=1\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup falied, generating a new port and retrying...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: port=34095\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 88797...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 88797 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 87215...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 87215 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 86502...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 86502 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "\n",
    "produces_example = (\n",
    "    import_fastkafka\n",
    "    + import_pydantic\n",
    "    + define_HelloWorld\n",
    "    + create_app\n",
    "    + decorate_produces\n",
    "    + define_run\n",
    ")\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=21092\n",
    ") as bootstrap_server:\n",
    "    exit_code, output = await _run_example_app(\n",
    "        app_example=produces_example,\n",
    "        bootstrap_server=bootstrap_server,\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, exit_code\n",
    "\n",
    "    proc = await run_and_match(\n",
    "        *consumer_cmd.replace(\n",
    "            \"<addr_of_your_kafka_bootstrap_server>\", bootstrap_server\n",
    "        ).split(\" \"),\n",
    "        pattern='{\"msg\":\"Hello world!\"}',\n",
    "        timeout=30,\n",
    "    )\n",
    "\n",
    "    await terminate_asyncio_process(proc)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9292901c",
   "metadata": {},
   "source": [
    "You should see the {\"msg\": \"Hello world!\"} messages in your topic."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "49b9c46e",
   "metadata": {},
   "source": [
    "## Choosing a topic"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ed9e5f0c",
   "metadata": {},
   "source": [
    "You probably noticed that you didn't define which topic you are sending the message to, this is because the `@produces` decorator determines the topic by default from your function name.\n",
    "The decorator will take your function name and strip the default \"to_\" prefix from it and use the rest as the topic name. In this example case, the topic is *hello_world*."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4938a558",
   "metadata": {},
   "source": [
    "!!! warn \\\"New topics\\\"\n",
    "\n",
    "    Kafka producers and application startup will fail if the topics you are producing to don't yet exist. Before running the app, make sure that the topics are created."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "dc3e9f8a",
   "metadata": {},
   "source": [
    "You can choose your custom prefix by defining the `prefix` parameter in produces decorator, like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "da734ca4",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "@app.produces(prefix=\"send_to_\")\n",
       "async def send_to_hello_world(msg: str) -> HelloWorld:\n",
       "    return HelloWorld(msg=msg)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: False\n",
    "decorate_produces_prefix = \"\"\"\n",
    "@app.produces(prefix=\"send_to_\")\n",
    "async def send_to_hello_world(msg: str) -> HelloWorld:\n",
    "    return HelloWorld(msg=msg)\n",
    "\"\"\"\n",
    "md(f\"```python\\n{decorate_produces_prefix}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b7563b45",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 90304...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 90304 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 89536...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 89536 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 89174...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 89174 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "\n",
    "produces_example = (\n",
    "    import_fastkafka\n",
    "    + import_pydantic\n",
    "    + define_HelloWorld\n",
    "    + create_app\n",
    "    + decorate_produces_prefix\n",
    "    + define_run.replace(\"to_hello_world\", \"send_to_hello_world\")\n",
    ")\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"hello_world\"], apply_nest_asyncio=True, listener_port=21092\n",
    ") as bootstrap_server:\n",
    "    exit_code, output = await _run_example_app(\n",
    "        app_example=produces_example,\n",
    "        bootstrap_server=bootstrap_server,\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, exit_code\n",
    "\n",
    "    proc = await run_and_match(\n",
    "        *consumer_cmd.replace(\n",
    "            \"<addr_of_your_kafka_bootstrap_server>\", bootstrap_server\n",
    "        ).split(\" \"),\n",
    "        pattern='{\"msg\":\"Hello world!\"}',\n",
    "        timeout=30,\n",
    "    )\n",
    "\n",
    "    await terminate_asyncio_process(proc)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0c4225dd",
   "metadata": {},
   "source": [
    "Also, you can define the topic name completely by defining the `topic` in parameter in produces decorator, like this:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9bf881b6",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/markdown": [
       "```python\n",
       "\n",
       "@app.produces(topic=\"my_special_topic\")\n",
       "async def to_hello_world(msg: str) -> HelloWorld:\n",
       "    return HelloWorld(msg=msg)\n",
       "\n",
       "```"
      ],
      "text/plain": [
       "<IPython.core.display.Markdown object>"
      ]
     },
     "execution_count": null,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# | echo: False\n",
    "decorate_produces_topic = \"\"\"\n",
    "@app.produces(topic=\"my_special_topic\")\n",
    "async def to_hello_world(msg: str) -> HelloWorld:\n",
    "    return HelloWorld(msg=msg)\n",
    "\"\"\"\n",
    "md(f\"```python\\n{decorate_produces_topic}\\n```\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "612baa8c",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n",
      "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n",
      "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n",
      "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: <class 'fastkafka.testing.ApacheKafkaBroker'>.start(): returning 127.0.0.1:9092\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 91793...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 91793 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 91026...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 91026 terminated.\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 90665...\n",
      "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 90665 terminated.\n",
      "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n"
     ]
    }
   ],
   "source": [
    "# | hide\n",
    "\n",
    "produces_example = (\n",
    "    import_fastkafka\n",
    "    + import_pydantic\n",
    "    + define_HelloWorld\n",
    "    + create_app\n",
    "    + decorate_produces_topic\n",
    "    + define_run\n",
    ")\n",
    "\n",
    "with ApacheKafkaBroker(\n",
    "    topics=[\"my_special_topic\"], apply_nest_asyncio=True, listener_port=21092\n",
    ") as bootstrap_server:\n",
    "    exit_code, output = await _run_example_app(\n",
    "        app_example=produces_example,\n",
    "        bootstrap_server=bootstrap_server,\n",
    "        script_file=script_file,\n",
    "        cmd=cmd,\n",
    "    )\n",
    "\n",
    "    expected_returncode = [0, 1]\n",
    "    assert exit_code in expected_returncode, exit_code\n",
    "\n",
    "    proc = await run_and_match(\n",
    "        *consumer_cmd.replace(\"<addr_of_your_kafka_bootstrap_server>\", bootstrap_server)\n",
    "        .replace(\"hello_world\", \"my_special_topic\")\n",
    "        .split(\" \"),\n",
    "        pattern='{\"msg\":\"Hello world!\"}',\n",
    "        timeout=30,\n",
    "    )\n",
    "\n",
    "    await terminate_asyncio_process(proc)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "da50972c",
   "metadata": {},
   "source": [
    "## Message data"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7e4e8052",
   "metadata": {},
   "source": [
    "The return value from your function will be translated JSON string and then to bytes and sent to defined Kafka topic. The typing of the return value is used for generating the documentation for your Kafka app.\n",
    "\n",
    "In this example case, the return value is HelloWorld class which will be translated into JSON formatted string and then to bytes. The translated data will then be sent to Kafka. In the from of: `b'{\"msg\":\"Hello world!\"}'`"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "python3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
